Task: Add Unit Test for Flink-Spark Equality Delete Write#1
Task: Add Unit Test for Flink-Spark Equality Delete Write#1
Conversation
| } | ||
|
|
||
| @TestTemplate | ||
| public void testCheckAndGetEqualityFieldIds() { |
There was a problem hiding this comment.
We can get rid of the all of the tests but your spark + flink test
|
|
||
| // Assert that only row with id=3 remains in the table | ||
| assertThat(actualData).containsExactlyInAnyOrderElementsOf(expectedData); | ||
|
|
There was a problem hiding this comment.
This is a good test for writing the equality delete using the stream execution environment! I'd also suggest creating a test for an UPSERT case by leveraging the FlinkTableEnviroment. This will help you see how, the UPSERT leverages an equality delete to replace the value.
You can add a test to TestFlinkCatalogTable.java like this:
sql("CREATE TABLE test_table (id INT, data STRING, PRIMARY KEY(id) NOT ENFORCED) WITH ('format-version'='2', 'write.upsert.enabled'='true')");
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c')");
// Perform upsert operation
sql("INSERT INTO test_table VALUES (2, 'updated_b'), (4, 'd')");
There was a problem hiding this comment.
To expand upon this, I'd suggest adding some DeleteFile assertions. For example, what do you expect the DeleteFile to look like we want to delete based on:
- one column: id = 1
- all columns: id = 1 and data = 'a'
- range: id > 3 (also how does flink get the values greater than 3?)
| .master("local[*]") | ||
| .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") | ||
| .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") | ||
| .config("spark.sql.catalog.myCatalog.warehouse", "file:///path/to/warehouse") |
There was a problem hiding this comment.
We should set the same warehouse as the Flink catalog to ensure we are using the same tables.
spark = SparkSession.builder()
.appName("iceberg-spark")
.master("local[*]")
.config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_catalog.type", "hadoop")
.config("spark.sql.catalog.hadoop_catalog.warehouse", CATALOG_EXTENSION.warehouse())
.getOrCreate();
Created new flink-spark-bundle module with Unit Test
testEqualityDeleteWritesOnSpark()inTestFlinkSpark.java.Procedures for
testEqualityDeleteWritesOnSpark():Current issues: running into errors with the imports as well as Invalid write distribution mode: range. Need to define sort order or partition spec.